In [1]:
from typing import *

import pandas as pd
import numpy as np
from pykalman import KalmanFilter
import plotly.graph_objects as go
In [2]:
data_dir = f'./data'
file_path = f'{data_dir}/NOK.csv'

price_col = 'Adj Close'
date_col = 'Date'
In [3]:
data = pd.read_csv(file_path)
In [4]:
data.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6446 entries, 0 to 6445
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       6446 non-null   object 
 1   Open       6446 non-null   float64
 2   High       6446 non-null   float64
 3   Low        6446 non-null   float64
 4   Close      6446 non-null   float64
 5   Adj Close  6446 non-null   float64
 6   Volume     6446 non-null   int64  
dtypes: float64(5), int64(1), object(1)
memory usage: 352.6+ KB
In [5]:
data[date_col] = pd.to_datetime(data[date_col])
In [6]:
data = data[data[date_col] >= '2000-01-01'].reset_index(drop=True)
In [7]:
data.head()
Out[7]:
Date Open High Low Close Adj Close Volume
0 2000-01-03 48.125000 48.250000 45.5000 46.75 25.261276 20275200
1 2000-01-04 44.250000 45.140625 43.1875 43.25 23.370058 19410000
2 2000-01-05 42.046875 43.625000 40.1875 42.50 22.964800 25034000
3 2000-01-06 42.000000 42.750000 38.2500 38.25 20.668320 19321600
4 2000-01-07 40.437500 43.000000 40.4375 42.75 23.099880 18058800
In [8]:
data.tail()
Out[8]:
Date Open High Low Close Adj Close Volume
5178 2020-08-03 4.96 5.13 4.94 5.06 5.06 62489200
5179 2020-08-04 5.04 5.14 5.01 5.10 5.10 42735100
5180 2020-08-05 5.07 5.10 4.99 5.00 5.00 36336500
5181 2020-08-06 4.97 5.01 4.95 5.00 5.00 30262600
5182 2020-08-07 4.95 4.99 4.93 4.98 4.98 21392500

1 Plot

In [9]:
fig = go.Figure(
    data=[
        go.Scatter(
            y=data[price_col],
            x=data[date_col], 
            name='input_data'),
    ],
    layout=go.Layout(
        xaxis_title=date_col,
        yaxis_title=price_col,
        height=500, width=1000)
)
fig.show("notebook")

2 Time series smoothing

In [10]:
def exponential_smoothing(series: list, alpha: float) -> list:
    result = [series[0]] 
    for n in range(1, len(series)):
        result.append(alpha * series[n] + (1 - alpha) * result[n-1])
    return result

alpha = 0.08
data['exponential_smoothing'] = exponential_smoothing(data[price_col].values, alpha)
window_size = 30
data['mean_rolling'] = data[price_col].rolling(window=window_size).mean()

kf = KalmanFilter(transition_matrices=1,
                  observation_matrices=1,
                  initial_state_mean=data[price_col].values[0],
                  initial_state_covariance=1,
                  observation_covariance=3,
                  transition_covariance=0.05)

data['kalman_smoothing'] = kf.smooth(data[price_col].values)[0]
In [11]:
fig = fig.add_trace(
    go.Scatter(
    y=data['kalman_smoothing'],
    x=data[date_col], 
    name='kalman_smoothing'))

fig = fig.add_trace(
    go.Scatter(
    y=data['mean_rolling'],
    x=data[date_col], 
    name='mean_rolling'))

fig = fig.add_trace(
    go.Scatter(
    y=data['exponential_smoothing'],
    x=data[date_col], 
    name='exponential_smoothing'))

fig.show("notebook")

3 Increasing and decreasing intervals

In [12]:
def find_intervals(series, epsilon=0.019) -> List[int]:
    
    derivative = np.diff(data[smooth_col])
    derivative = np.concatenate((derivative, (np.nan,)))  # unknown derivative value for last day
    
    stationary_points = np.zeros(derivative.shape)
    stationary_points[(derivative > -epsilon) & (derivative < epsilon)] = 1
    print(f"Stationary poings number = "\
          f"{stationary_points[stationary_points == 1].shape[0]}")
    
    intervals = []
    derivative_sum = 0  # to check sign on interval
    count = 0
    
    for i, deriv in enumerate(derivative):
        count += 1
        # skip unknown last day derivative
        if not np.isnan(deriv):
            derivative_sum += deriv
        
        if stationary_points[i] == 1 or i == (len(derivative) - 1):
            # determine the sign of the derivative on the interval
            sign = np.sign(derivative_sum)
            # fill interval with 1 (increasing) or -1 (decreasing) values     
            intervals += [sign for j in range(count)]
            count = 0
            derivative_sum = 0
            
    return intervals
In [13]:
smooth_col = 'kalman_smoothing'
intervals = find_intervals(data[smooth_col].values)
Stationary poings number = 2540
In [14]:
fig2 = go.Figure(
    data=[
        go.Scatter(
            y=data[smooth_col],
            x=data[date_col], 
            name='smoothed_data'),
        go.Scatter(
            y=intervals,
            x=data[date_col], 
            name='grow_intervals'),
    ],
    layout=go.Layout(
        xaxis_title=date_col,
        yaxis_title=price_col,
        height=500, width=1000)
)
fig2.show("notebook")

4 Real-time time series filtering

In [15]:
%%time
kf = KalmanFilter(transition_matrices=1,
                  observation_matrices=1,
                  initial_state_mean=data[price_col].values[0],
                  initial_state_covariance=1,
                  observation_covariance=3,
                  transition_covariance=0.05)

series = data[price_col].values
window_size = 30
series_smoothed_by_window = np.full(series.shape, np.nan)
n_iter = series.shape[0] - window_size + 1

for i in range(n_iter):
    start_timestamp = i
    current_timestamp = i + window_size - 1
    window = series[start_timestamp: current_timestamp + 1]
    # set initial kalman state for current window
    kf.initial_state_mean = window[0]
    kf.initial_state_covariance = 3
    
    window_smooth = kf.filter(window)[0].squeeze()  
    current_smooth_value = window_smooth[-1]
    series_smoothed_by_window[current_timestamp] = current_smooth_value
    
realtime_col = 'realtime_filter'
data[realtime_col] = series_smoothed_by_window
Wall time: 39.3 s
In [16]:
fig3 = go.Figure(
    data=[
        go.Scatter(
            y=data[price_col],
            x=data[date_col], 
            name='source_data'),
        go.Scatter(
            y=data[smooth_col],
            x=data[date_col], 
            name=smooth_col),
        go.Scatter(
            y=data[realtime_col],
            x=data[date_col], 
            name=f"{realtime_col} (kalman)"),
    ],
    layout=go.Layout(
        xaxis_title=date_col,
        yaxis_title=price_col,
        height=500, width=1000)
)
fig3.show("notebook")

5 Real-time filtering for last 30 days

In [17]:
n_days = 30

data_tail = data[-n_days:]

fig4 = go.Figure(
    data=[
        go.Scatter(
            y=data_tail[price_col],
            x=data_tail[date_col], 
            name='source_data'),
        go.Scatter(
            y=data_tail[smooth_col],
            x=data_tail[date_col], 
            name='smoothed_data'),
        go.Scatter(
            y=data_tail[realtime_col],
            x=data_tail[date_col], 
            name=f"{realtime_col} (kalman)"),
    ],
    layout=go.Layout(
        xaxis_title=date_col,
        yaxis_title=price_col,
        height=500, width=1000)
)
fig4.show("notebook")

As we can see on the plot real-time filtering with rolling window of 30 days returns smoothed value with some lag.

Animation

In [18]:
df = data_tail

trace1 = go.Scatter(x=df[date_col][:2],
                    y=df[price_col][:2],
                    mode='lines',
                    name='Source data',
                    line=go.scatter.Line(width=2))
trace2 = go.Scatter(x=df[date_col][:2],
                    y=df[realtime_col][:2],
                    mode='lines',
                    name='Filtered data',
                    line=go.scatter.Line(width=2))
frames = [
    dict(
        data=[
            dict(
                type='scatter',
                x=df[date_col][:k + 1],
                y=df[price_col][:k + 1]),
            dict(
                type='scatter',
                x=df[date_col][:k + 1],
                y=df[realtime_col][:k + 1]),
            ],
        traces=[0, 1],  
    ) for k  in  range(1, len(df) - 1)
]
layout = go.Layout(
    width=1000,
    height=500,
    showlegend=True,
    hovermode='x unified',
    updatemenus=[
        dict(
            type='buttons', 
            showactive=False,
            y=0.05,
            x=1.05,
            xanchor='left',
            yanchor='top',
            pad=dict(t=0, r=10),
            buttons=[
                dict(
                    label='Play animation',
                    method='animate',
                    args=[
                        None, 
                        dict(
                            frame=dict(
                            duration=150, 
                            redraw=True),
                            transition=dict(duration=0),
                            fromcurrent=True,
                            mode='immediate')
                    ]
                )
            ]
         )
     ])
layout.update(xaxis=dict(range=[df[date_col].min(), df[date_col].max()], autorange=False),
              yaxis=dict(range=[df[price_col].min(), df[price_col].max()], autorange=False));
fig5 = go.Figure(data=[trace1, trace2], frames=frames, layout=layout)
fig5.show("notebook")
In [ ]: